/*
 *
 * FileName: ZkNode.java
 * Author:   wx:fdzhangc
 * Date:     2021/11/24 11:41
 * Description: //模块目的、功能描述
 * History: //修改记录
 * <author>      <time>      <version>    <desc>
 * 修改人姓名    修改时间    版本号       描述
 */
package com.zhangc.zcscm.zookeeper;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.zhangc.zcscm.zookeeper.exception.DataException;
import com.zhangc.zcscm.zookeeper.exception.ZkTimeoutException;

/**
 * 功能描述:<br>
 *
 * @author wx:fdzhangc
 * @see [相关类/方法]（可选）
 * @since [产品/模块版本] （可选）
 */
public class ZkNode<T> implements DataCache<T>, IZkDataListener {
    private static Logger logger = LoggerFactory.getLogger(ZkNode.class);
    private ZkClient client;
    private String nodePath;
    private Deserializer<T> deserializer;
    private volatile T nodeData;
    private List<DataListener<T>> dataListeners;
    private AtomicBoolean synced;
    private boolean watchIfNoNodeExisted;
    private boolean throwExceptionWhenGetDataIfNotSynced;
    private AtomicBoolean existed;
    private boolean permitSyncFastFailIfZKNotAlive;
    private Lock lock;

    public ZkNode(ZkClient zkClient, String nodePath, Deserializer<T> deserializer) {
        this(zkClient, nodePath, deserializer, true, false, false);
    }

    public ZkNode(ZkClient zkClient, String nodePath, Deserializer<T> deserializer, boolean watchIfNoNodeExisted) {
        this(zkClient, nodePath, deserializer, watchIfNoNodeExisted, false, false);
    }

    public ZkNode(ZkClient zkClient, String nodePath, Deserializer<T> deserializer, boolean watchIfNoNodeExisted,
            boolean throwExceptionWhenGetDataIfNotSynced) {
        this(zkClient, nodePath, deserializer, watchIfNoNodeExisted, throwExceptionWhenGetDataIfNotSynced, false);
    }

    public ZkNode(ZkClient zkClient, String path, Deserializer<T> deserializer, boolean watchIfNoNodeExisted,
            boolean throwExceptionWhenGetDataIfNotSynced, boolean permitSyncFastFailIfZKNotAlive) {
        this.dataListeners = new CopyOnWriteArrayList();
        this.synced = new AtomicBoolean(false);
        this.watchIfNoNodeExisted = true;
        this.throwExceptionWhenGetDataIfNotSynced = false;
        this.existed = new AtomicBoolean(false);
        this.permitSyncFastFailIfZKNotAlive = false;
        this.lock = new ReentrantLock();
        if (zkClient == null) {
            throw new IllegalArgumentException();
        } else if (path != null && !"".equals(path.trim())) {
            if (deserializer == null) {
                throw new IllegalArgumentException();
            } else {
                this.client = zkClient;
                this.nodePath = path;
                this.deserializer = deserializer;
                this.watchIfNoNodeExisted = watchIfNoNodeExisted;
                this.throwExceptionWhenGetDataIfNotSynced = throwExceptionWhenGetDataIfNotSynced;
                this.permitSyncFastFailIfZKNotAlive = permitSyncFastFailIfZKNotAlive;
            }
        } else {
            throw new IllegalArgumentException();
        }
    }

    public void handleDataChange(String dataPath, byte[] data) throws Exception {
        this.lock.lock();

        Object oldData;
        T newData;
        try {
            this.existed.set(true);
            newData = data == null ? null : this.deserializer.deserialize(data);
            oldData = this.updateData((T) newData);
        } finally {
            this.lock.unlock();
        }

        this.notifyListeners(newData, (T) oldData);
    }

    public void handleDataDeleted(String dataPath) throws Exception {
        this.lock.lock();

        T oldData;
        try {
            this.existed.set(false);
            oldData = this.updateData((T) null);
        } finally {
            this.lock.unlock();
        }

        this.notifyListeners((T) null, oldData);
    }

    private void _sync(boolean untilSuccess) {
        this.lock.lock();

        try {
            if (!untilSuccess && !this.client.isAlive() && this.permitSyncFastFailIfZKNotAlive) {
                throw new ZkTimeoutException("ZKClient is not alive,sync fast fail.");
            }

            if (!this.synced.get()) {
                long timeout = untilSuccess ? 315360000000L : 30000L;
                ZkClient.ReadResult result = this.client.subscribeDataChanges(this.nodePath, this, this.watchIfNoNodeExisted, timeout);
                this.existed.set(result.existed);
                this.nodeData = result.data == null ? null : this.deserializer.deserialize(result.data);
                this.synced.set(true);
            }
        } finally {
            this.lock.unlock();
        }

    }

    public void sync(boolean retryUntilSuccess) throws DataException {
        this.sync(retryUntilSuccess, true);
    }

    protected void sync(boolean retryUntilSuccess, boolean addBackgroundIfFail) throws DataException {
        try {
            this._sync(retryUntilSuccess);
        } catch (Exception var5) {
            if (addBackgroundIfFail) {
                Runnable job = new Runnable() {
                    public void run() {
                        ZkNode.this._sync(true);
                    }
                };
                this.client.addBackgroundJob("Sync operation", job);
            }

            throw new DataException(var5);
        }
    }

    public T getData() {
        if (!this.synced.get() && this.throwExceptionWhenGetDataIfNotSynced) {
            throw new DataException("Sync not complete yet.");
        } else {
            return this._getData();
        }
    }

    protected T _getData() {
        return this.nodeData;
    }

    public boolean existed() {
        return this.existed.get();
    }

    public void monitor(final T currentExpectData, final DataListener<T> dataListener, final boolean notifyListenerIgnoreEquals) {
        Runnable job = new Runnable() {
            public void run() {
                try {
                    if (notifyListenerIgnoreEquals || !ZkNode.this._equals(currentExpectData, ZkNode.this.nodeData)) {
                        dataListener.dataChanged(currentExpectData, ZkNode.this.nodeData);
                    }
                } catch (Exception var2) {
                    ZkNode.logger.error("Exception", var2);
                }

                if (!ZkNode.this.dataListeners.contains(dataListener)) {
                    ZkNode.this.dataListeners.add(dataListener);
                }

            }
        };
        this.client.addBackgroundJob("Monitor operation", job);
    }

    public void monitor(T currentExpectData, DataListener<T> dataListener) {
        this.monitor(currentExpectData, dataListener, false);
    }

    protected boolean _equals(T data1, T data2) {
        if (data1 == null) {
            if (data2 != null) {
                return false;
            }
        } else if (!data1.equals(data2)) {
            return false;
        }

        return true;
    }

    public void destroy() {
        this.client.unsubscribeDataChanges(this.nodePath, this);
        this.nodeData = null;
        this.client = null;
        this.deserializer = null;
        this.dataListeners.clear();
    }

    private T updateData(T newData) {
        T oldData = this.nodeData;
        this.nodeData = newData;
        return oldData;
    }

    private void notifyListeners(T newData, T oldData) {
        if (!this.dataListeners.isEmpty() && !this._equals(oldData, newData)) {
            Iterator i$ = this.dataListeners.iterator();

            while (i$.hasNext()) {
                DataListener listener = (DataListener) i$.next();

                try {
                    listener.dataChanged(oldData, newData);
                } catch (Exception var6) {
                    logger.error("Exception", var6);
                }
            }
        }

    }
}
